# Copyright (c) 2018, Ansible Project
# Copyright (c) 2018, Abhijeet Kasurde <akasurde@redhat.com>
#
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import annotations

import os
from unittest.mock import patch, Mock

from ansible_collections.community.internal_test_tools.tests.unit.plugins.modules.utils import (
    ModuleTestCase,
    set_module_args,
)
from ansible.module_utils.basic import AnsibleModule  # noqa: F401 # pylint: disable=unused-import
from ansible_collections.community.general.plugins.modules.java_keystore import JavaKeystore, create_module


class TestCreateJavaKeystore(ModuleTestCase):
    """Test the creation of a Java keystore."""

    def setUp(self):
        """Setup."""
        super().setUp()

        orig_exists = os.path.exists
        self.mock_create_file = patch("ansible_collections.community.general.plugins.modules.java_keystore.create_file")
        self.mock_create_path = patch("ansible_collections.community.general.plugins.modules.java_keystore.create_path")
        self.mock_current_type = patch(
            "ansible_collections.community.general.plugins.modules.java_keystore.JavaKeystore.current_type"
        )
        self.mock_run_command = patch("ansible.module_utils.basic.AnsibleModule.run_command")
        self.mock_get_bin_path = patch("ansible.module_utils.basic.AnsibleModule.get_bin_path")
        self.mock_preserved_copy = patch("ansible.module_utils.basic.AnsibleModule.preserved_copy")
        self.mock_atomic_move = patch("ansible.module_utils.basic.AnsibleModule.atomic_move")
        self.mock_os_path_exists = patch(
            "os.path.exists", side_effect=lambda path: True if path == "/path/to/keystore.jks" else orig_exists(path)
        )
        self.mock_selinux_context = patch(
            "ansible.module_utils.basic.AnsibleModule.selinux_context",
            side_effect=lambda path: ["unconfined_u", "object_r", "user_home_t", "s0"],
        )
        self.mock_is_special_selinux_path = patch(
            "ansible.module_utils.basic.AnsibleModule.is_special_selinux_path", side_effect=lambda path: (False, None)
        )
        self.run_command = self.mock_run_command.start()
        self.get_bin_path = self.mock_get_bin_path.start()
        self.preserved_copy = self.mock_preserved_copy.start()
        self.atomic_move = self.mock_atomic_move.start()
        self.create_file = self.mock_create_file.start()
        self.create_path = self.mock_create_path.start()
        self.current_type = self.mock_current_type.start()
        self.selinux_context = self.mock_selinux_context.start()
        self.is_special_selinux_path = self.mock_is_special_selinux_path.start()
        self.os_path_exists = self.mock_os_path_exists.start()

    def tearDown(self):
        """Teardown."""
        super().tearDown()
        self.mock_create_file.stop()
        self.mock_create_path.stop()
        self.mock_current_type.stop()
        self.mock_run_command.stop()
        self.mock_get_bin_path.stop()
        self.mock_preserved_copy.stop()
        self.mock_atomic_move.stop()
        self.mock_selinux_context.stop()
        self.mock_is_special_selinux_path.stop()
        self.mock_os_path_exists.stop()

    def test_create_jks_success(self):
        with set_module_args(
            dict(
                certificate="cert-foo",
                private_key="private-foo",
                dest="/path/to/keystore.jks",
                name="test",
                password="changeit",
            )
        ):
            module = create_module()

        with patch("os.remove", return_value=True):
            self.create_path.side_effect = ["/tmp/tmpgrzm2ah7"]
            self.create_file.side_effect = ["/tmp/etacifitrec", "/tmp/yek_etavirp", ""]
            self.run_command.side_effect = [(0, "", ""), (0, "", "")]
            self.get_bin_path.side_effect = ["keytool", "openssl", ""]
            jks = JavaKeystore(module)
            assert jks.create() == {
                "changed": True,
                "cmd": [
                    "keytool",
                    "-importkeystore",
                    "-destkeystore",
                    "/path/to/keystore.jks",
                    "-srckeystore",
                    "/tmp/tmpgrzm2ah7",
                    "-srcstoretype",
                    "pkcs12",
                    "-alias",
                    "test",
                    "-noprompt",
                ],
                "msg": "",
                "rc": 0,
            }

    def test_create_jks_keypass_fail_export_pkcs12(self):
        with set_module_args(
            dict(
                certificate="cert-foo",
                private_key="private-foo",
                private_key_passphrase="passphrase-foo",
                dest="/path/to/keystore.jks",
                name="test",
                password="changeit",
            )
        ):
            module = create_module()

        module.exit_json = Mock()
        module.fail_json = Mock()

        with patch("os.remove", return_value=True):
            self.create_path.side_effect = ["/tmp/tmp1cyp12xa"]
            self.create_file.side_effect = ["/tmp/tmpvalcrt32", "/tmp/tmpwh4key0c", ""]
            self.run_command.side_effect = [(1, "", "Oops"), (0, "", "")]
            self.get_bin_path.side_effect = ["keytool", "openssl", ""]
            jks = JavaKeystore(module)
            jks.create()
            module.fail_json.assert_called_once_with(
                cmd=[
                    "openssl",
                    "pkcs12",
                    "-export",
                    "-name",
                    "test",
                    "-in",
                    "/tmp/tmpvalcrt32",
                    "-inkey",
                    "/tmp/tmpwh4key0c",
                    "-out",
                    "/tmp/tmp1cyp12xa",
                    "-passout",
                    "stdin",
                    "-passin",
                    "stdin",
                ],
                msg="",
                err="Oops",
                rc=1,
            )

    def test_create_jks_fail_export_pkcs12(self):
        with set_module_args(
            dict(
                certificate="cert-foo",
                private_key="private-foo",
                dest="/path/to/keystore.jks",
                name="test",
                password="changeit",
            )
        ):
            module = create_module()

        module.exit_json = Mock()
        module.fail_json = Mock()

        with patch("os.remove", return_value=True):
            self.create_path.side_effect = ["/tmp/tmp1cyp12xa"]
            self.create_file.side_effect = ["/tmp/tmpvalcrt32", "/tmp/tmpwh4key0c", ""]
            self.run_command.side_effect = [(1, "", "Oops"), (0, "", "")]
            self.get_bin_path.side_effect = ["keytool", "openssl", ""]
            jks = JavaKeystore(module)
            jks.create()
            module.fail_json.assert_called_once_with(
                cmd=[
                    "openssl",
                    "pkcs12",
                    "-export",
                    "-name",
                    "test",
                    "-in",
                    "/tmp/tmpvalcrt32",
                    "-inkey",
                    "/tmp/tmpwh4key0c",
                    "-out",
                    "/tmp/tmp1cyp12xa",
                    "-passout",
                    "stdin",
                ],
                msg="",
                err="Oops",
                rc=1,
            )

    def test_create_jks_fail_import_key(self):
        with set_module_args(
            dict(
                certificate="cert-foo",
                private_key="private-foo",
                dest="/path/to/keystore.jks",
                name="test",
                password="changeit",
            )
        ):
            module = create_module()

        module.exit_json = Mock()
        module.fail_json = Mock()

        with patch("os.remove", return_value=True):
            self.create_path.side_effect = ["/tmp/tmpgrzm2ah7"]
            self.create_file.side_effect = ["/tmp/etacifitrec", "/tmp/yek_etavirp", ""]
            self.run_command.side_effect = [(0, "", ""), (1, "", "Oops")]
            self.get_bin_path.side_effect = ["keytool", "openssl", ""]
            jks = JavaKeystore(module)
            jks.create()
            module.fail_json.assert_called_once_with(
                cmd=[
                    "keytool",
                    "-importkeystore",
                    "-destkeystore",
                    "/path/to/keystore.jks",
                    "-srckeystore",
                    "/tmp/tmpgrzm2ah7",
                    "-srcstoretype",
                    "pkcs12",
                    "-alias",
                    "test",
                    "-noprompt",
                ],
                msg="",
                err="Oops",
                rc=1,
            )


class TestCertChanged(ModuleTestCase):
    """Test if the cert has changed."""

    def setUp(self):
        """Setup."""
        super().setUp()
        self.mock_create_file = patch("ansible_collections.community.general.plugins.modules.java_keystore.create_file")
        self.mock_current_type = patch(
            "ansible_collections.community.general.plugins.modules.java_keystore.JavaKeystore.current_type"
        )
        self.mock_run_command = patch("ansible.module_utils.basic.AnsibleModule.run_command")
        self.mock_get_bin_path = patch("ansible.module_utils.basic.AnsibleModule.get_bin_path")
        self.mock_preserved_copy = patch("ansible.module_utils.basic.AnsibleModule.preserved_copy")
        self.mock_atomic_move = patch("ansible.module_utils.basic.AnsibleModule.atomic_move")
        self.run_command = self.mock_run_command.start()
        self.create_file = self.mock_create_file.start()
        self.get_bin_path = self.mock_get_bin_path.start()
        self.current_type = self.mock_current_type.start()
        self.preserved_copy = self.mock_preserved_copy.start()
        self.atomic_move = self.mock_atomic_move.start()

    def tearDown(self):
        """Teardown."""
        super().tearDown()
        self.mock_create_file.stop()
        self.mock_current_type.stop()
        self.mock_run_command.stop()
        self.mock_get_bin_path.stop()
        self.mock_preserved_copy.stop()
        self.mock_atomic_move.stop()

    def test_cert_unchanged_same_fingerprint(self):
        with set_module_args(
            dict(
                certificate="cert-foo",
                private_key="private-foo",
                dest="/path/to/keystore.jks",
                name="foo",
                password="changeit",
            )
        ):
            module = create_module()

        with patch("os.remove", return_value=True):
            self.create_file.side_effect = ["/tmp/placeholder", ""]
            self.run_command.side_effect = [(0, "foo=abcd:1234:efgh", ""), (0, "SHA256: abcd:1234:efgh", "")]
            self.get_bin_path.side_effect = ["keytool", "openssl", ""]
            self.current_type.side_effect = ["jks"]
            jks = JavaKeystore(module)
            result = jks.cert_changed()
            self.assertFalse(result, "Fingerprint is identical")

    def test_cert_changed_fingerprint_mismatch(self):
        with set_module_args(
            dict(
                certificate="cert-foo",
                private_key="private-foo",
                dest="/path/to/keystore.jks",
                name="foo",
                password="changeit",
            )
        ):
            module = create_module()

        with patch("os.remove", return_value=True):
            self.create_file.side_effect = ["/tmp/placeholder", ""]
            self.run_command.side_effect = [(0, "foo=abcd:1234:efgh", ""), (0, "SHA256: wxyz:9876:stuv", "")]
            self.get_bin_path.side_effect = ["keytool", "openssl", ""]
            self.current_type.side_effect = ["jks"]
            jks = JavaKeystore(module)
            result = jks.cert_changed()
            self.assertTrue(result, "Fingerprint mismatch")

    def test_cert_changed_alias_does_not_exist(self):
        with set_module_args(
            dict(
                certificate="cert-foo",
                private_key="private-foo",
                dest="/path/to/keystore.jks",
                name="foo",
                password="changeit",
            )
        ):
            module = create_module()

        with patch("os.remove", return_value=True):
            self.create_file.side_effect = ["/tmp/placeholder", ""]
            self.run_command.side_effect = [
                (0, "foo=abcd:1234:efgh", ""),
                (1, "keytool error: java.lang.Exception: Alias <foo> does not exist", ""),
            ]
            self.get_bin_path.side_effect = ["keytool", "openssl", ""]
            jks = JavaKeystore(module)
            result = jks.cert_changed()
            self.assertTrue(result, "Alias mismatch detected")

    def test_cert_changed_password_mismatch(self):
        with set_module_args(
            dict(
                certificate="cert-foo",
                private_key="private-foo",
                dest="/path/to/keystore.jks",
                name="foo",
                password="changeit",
            )
        ):
            module = create_module()

        with patch("os.remove", return_value=True):
            self.create_file.side_effect = ["/tmp/placeholder", ""]
            self.run_command.side_effect = [
                (0, "foo=abcd:1234:efgh", ""),
                (1, "keytool error: java.io.IOException: Keystore password was incorrect", ""),
            ]
            self.get_bin_path.side_effect = ["keytool", "openssl", ""]
            jks = JavaKeystore(module)
            result = jks.cert_changed()
            self.assertTrue(result, "Password mismatch detected")

    def test_cert_changed_fail_read_cert(self):
        with set_module_args(
            dict(
                certificate="cert-foo",
                private_key="private-foo",
                dest="/path/to/keystore.jks",
                name="foo",
                password="changeit",
            )
        ):
            module = create_module()

        module.exit_json = Mock()
        module.fail_json = Mock()

        with patch("os.remove", return_value=True):
            self.create_file.side_effect = ["/tmp/tmpdj6bvvme", ""]
            self.run_command.side_effect = [(1, "", "Oops"), (0, "SHA256: wxyz:9876:stuv", "")]
            self.get_bin_path.side_effect = ["keytool", "openssl", ""]
            self.current_type.side_effect = ["jks"]
            jks = JavaKeystore(module)
            jks.cert_changed()
            module.fail_json.assert_called_once_with(
                cmd=["openssl", "x509", "-noout", "-in", "/tmp/tmpdj6bvvme", "-fingerprint", "-sha256"],
                msg="",
                err="Oops",
                rc=1,
            )

    def test_cert_changed_fail_read_keystore(self):
        with set_module_args(
            dict(
                certificate="cert-foo",
                private_key="private-foo",
                dest="/path/to/keystore.jks",
                name="foo",
                password="changeit",
            )
        ):
            module = create_module()

        module.exit_json = Mock()
        module.fail_json = Mock(return_value=True)

        with patch("os.remove", return_value=True):
            self.create_file.side_effect = ["/tmp/placeholder", ""]
            self.run_command.side_effect = [(0, "foo: wxyz:9876:stuv", ""), (1, "", "Oops")]
            self.get_bin_path.side_effect = ["keytool", "openssl", ""]
            jks = JavaKeystore(module)
            jks.cert_changed()
            module.fail_json.assert_called_with(
                cmd=["keytool", "-list", "-alias", "foo", "-keystore", "/path/to/keystore.jks", "-v"],
                msg="",
                err="Oops",
                rc=1,
            )
